// CBMTP.cpp : Defines the exported functions for the DLL application.
//

#ifndef BMTP_EXPORTS
#define BMTP_EXPORTS 1
#endif

#include "stdafx2.h"
#include "bmtp2.h"
#include "log.h"

#define BMTP_ERROR(p, err_no) \
bmtp_log_add(p,"%s, Line %d\n", __FILE__, __LINE__); bmtp_on_error(p, err_no);

void bmtp_close(struct bmtp_context *bmtp);

int bmtp_send_conn(struct bmtp_context *bmtp);
int bmtp_send_puback(struct bmtp_context *bmtp, unsigned char sid);
int bmtp_send_ping(struct bmtp_context *bmtp);
int bmtp_send_disconn(struct bmtp_context *bmtp);
int bmtp_send_pub(struct bmtp_context *bmtp, unsigned char *msg, int len, unsigned char qos, unsigned char dup);

void bmtp_on_open( BMTP *p ) {}
void bmtp_on_close( BMTP *p ) {}
void bmtp_on_pub( BMTP *p, BMTP_MSG *msg ) {}
void bmtp_on_ack( BMTP *p, BMTP_PACKAGE *msg, int status ) {}
void bmtp_on_error( BMTP *p, int err_no ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;
    
    bmtp_log_add(bmtp,"======== %s:%d error dump %d ========\n",
        inet_ntoa(bmtp->server_addr.sin_addr),
        ntohs(bmtp->server_addr.sin_port),
        err_no);
    
    bmtp_log_add(bmtp,"recv_length: %u\nrecv_offset: %u\nrecv_buf:\n",
        bmtp->recv_length,
        bmtp->recv_offset);
    
    unsigned int i;
    for(i=0;i<bmtp->recv_length;i++) {
        bmtp_log_add(bmtp,"%02X", bmtp->recv_buf[i]);
    }

    //unsigned int  page[8];

    //struct bmtp_msg wait_queue;
    //struct bmtp_msg send_queue;
    //struct bmtp_msg ack_queue;
    
    bmtp_log_add(bmtp,"==================================\n");
    
    // 调用自定义回调
    if( bmtp->on_error != bmtp_on_error ) {
        bmtp->on_error( p, err_no );
    }

    switch( err_no ) {
        default:
            // 发生 error 后，跳出循环重建连接
            event_base_loopbreak( bmtp->base );
    }
}

#ifdef WIN32
void bmtp_mutex_init( struct bmtp_context *bmtp ) {
    InitializeCriticalSection( &bmtp->cs );
}

void bmtp_mutex_lock( struct bmtp_context *bmtp ) {
    EnterCriticalSection( &bmtp->cs );
}

void bmtp_mutex_unlock( struct bmtp_context *bmtp ) {
    LeaveCriticalSection( &bmtp->cs );
}
#else
void bmtp_mutex_init( struct bmtp_context *bmtp ) {
    pthread_mutex_init( &bmtp->cs, NULL );
}

void bmtp_mutex_lock( struct bmtp_context *bmtp ) {
    pthread_mutex_lock( &bmtp->cs );
}

void bmtp_mutex_unlock( struct bmtp_context *bmtp ) {
    pthread_mutex_unlock( &bmtp->cs );
}
#endif

int bmtp_param_parser(struct bmtp_context *bmtp, struct bmtp_param_list param_table[16]) {
    if( bmtp->op_type != TYPE_STRING ) {
        return BMTP_OK;
    }
    
    int state = STATE_START;
    int offset = 0;
    struct bmtp_param_list param;
    unsigned char *buf = bmtp->op_value.s;
    unsigned char c;
    
    while( 1 ) {
        switch(state) {
            case STATE_START:
                memset(&param, 0, sizeof(param));
                
                state = STATE_KEY;
            case STATE_KEY:
                if( offset + 1 > bmtp->op_value.l ) {
                    goto end;
                }
                
                c = buf[offset++];
                
                param.key = ( c & 0x7F ) >> 3;
                param.key_type = c;
                
                if( c & 0x80 ) {
                    state = STATE_KEY_EXT1;
                } else {
                    state = STATE_VALUE;
                }
                break;
            case STATE_KEY_EXT1:
            case STATE_KEY_EXT2:
            case STATE_KEY_EXT3:
            case STATE_KEY_EXT4:
                if( offset + 1 > bmtp->op_value.l ) {
                    goto end;
                }

                c = buf[offset++];
                
                param.key += ( c & 0x7F ) << ( 4 + 7 * ( state - STATE_KEY_EXT1 ) );
                
                if( c & 0x80 ) {
                    if( state == STATE_KEY_EXT4 ) {
                        return BMTP_UNKNOWN_ERROR;
                    } else {
                        state ++;
                    }
                } else {
                    state = STATE_VALUE;
                }
                break;
            case STATE_VALUE:
                switch( param.key_type ) {
                    case TYPE_BOOL:   state = STATE_VALUE_BOOL;   break;
                    case TYPE_VARINT: state = STATE_VALUE_VARINT; break;
                    case TYPE_64BIT:  state = STATE_VALUE_64BIT;  break;
                    case TYPE_STRING: state = STATE_VALUE_VARINT; break;
                    default:          return BMTP_UNKNOWN_ERROR;
                }
                break;
            case STATE_VALUE_BOOL:
                param.value.l = 1;
                
                state = STATE_END;
                break;
            case STATE_VALUE_VARINT:
                if( offset + 1 > bmtp->op_value.l ) {
                    goto end;
                }

                c = buf[offset++];
                
                param.value.l = ( c & 0x7F );
                
                if( c & 0x80 ) {
                    state = STATE_VALUE_VARINT_EXT1;
                } else {
                    state = STATE_END;
                }
                break;
            case STATE_VALUE_VARINT_EXT1:
            case STATE_VALUE_VARINT_EXT2:
            case STATE_VALUE_VARINT_EXT3:
            case STATE_VALUE_VARINT_EXT4:
            case STATE_VALUE_VARINT_EXT5:
            case STATE_VALUE_VARINT_EXT6:
            case STATE_VALUE_VARINT_EXT7:
            case STATE_VALUE_VARINT_EXT8:
            case STATE_VALUE_VARINT_EXT9:
                if( offset + 1 > bmtp->op_value.l ) {
                    goto end;
                }

                c = buf[offset++];
                
                param.value.l += ( c & 0x7F ) << ( ( state - STATE_VALUE_VARINT ) * 7 );
                
                if( c & 0x80 ) {
                    if( state == STATE_VALUE_VARINT_EXT9 ) {
                        return BMTP_UNKNOWN_ERROR;
                    } else {
                        state += 1;
                    }
                } else {
                    if( param.key_type == TYPE_STRING ) {
                        state = STATE_VALUE_STRING;
                    } else {
                        state = STATE_END;
                    }
                }
                break;
            case STATE_VALUE_STRING:
                if( offset + param.value.l > bmtp->op_value.l ) {
                    goto end;
                }

                param.value.s = buf + offset;
                
                offset += param.value.l;

                state = STATE_END;
                break;
            case STATE_VALUE_64BIT:
                if( offset + 8 > bmtp->op_value.l ) {
                    goto end;
                }
                
                param.value.l = (unsigned long long int)( buf + offset );
                
                offset += 8;

                state = STATE_END;
                break;

            case STATE_END:
                if( param.key >=0 && param.key < 16 ) {
                    param_table[param.key] = param;
                    state = STATE_START;
                } else {
                    goto end;
                }
                break;
            default:
                return BMTP_UNKNOWN_ERROR;
        }
    }
    
    end:
    if( state != STATE_KEY ) {
        return BMTP_UNKNOWN_ERROR;
    } else {
        return BMTP_OK;
    }
}

int bmtp_on_connack(struct bmtp_context *bmtp) {
    bmtp_log_add(bmtp,"bmtp conn ok\n");
    
    int status = 0;
    switch( status ) {
        case 0:
            bmtp->on_open( bmtp );
            return BMTP_OK;
        default:
            BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
            return BMTP_UNKNOWN_ERROR;
    }

    return BMTP_OK;
}

int bmtp_on_publish(struct bmtp_context *bmtp) {
    bmtp_log_add(bmtp,"received %s:%d - %.*s ok\n",
        inet_ntoa(bmtp->server_addr.sin_addr),
        ntohs(bmtp->server_addr.sin_port),
        bmtp->payload_length > 256 ? 256 : bmtp->payload_length,
        (char *)bmtp->payload);

    BMTP_MSG msg;
    msg.data = (const char *)bmtp->payload;
    msg.data_len = bmtp->payload_length;

    bmtp->on_pub(bmtp, &msg);

    return BMTP_OK;
}

int bmtp_on_puback(struct bmtp_context *bmtp) {
    struct bmtp_param_list param_table[16];
    memset(param_table, 0, sizeof(param_table));
    
    if( bmtp_param_parser( bmtp, param_table ) != BMTP_OK ) {
        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
        return BMTP_UNKNOWN_ERROR;
    }
    
    unsigned long long int stream_id = 0;
    unsigned int status = 0;
    if( param_table[PARAM_PUBACK_STREAM_ID].key == PARAM_PUBACK_STREAM_ID &&
        ( param_table[PARAM_PUBACK_STREAM_ID].key_type == TYPE_VARINT || param_table[PARAM_PUBACK_STREAM_ID].key_type == TYPE_64BIT ) ) {
        stream_id = param_table[PARAM_PUBACK_STREAM_ID].value.l;
    } else {
        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
        return BMTP_UNKNOWN_ERROR;
    }

    if( param_table[PARAM_PUBACK_STATUS].key == PARAM_PUBACK_STATUS &&
        ( param_table[PARAM_PUBACK_STATUS].key_type == TYPE_VARINT || param_table[PARAM_PUBACK_STATUS].key_type == TYPE_64BIT ) ) {
        status = param_table[PARAM_PUBACK_STATUS].value.l;
    } else {
        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
        return BMTP_UNKNOWN_ERROR;
    }
    
    //printf("%lld\n", stream_id);
    
    // 搜索 ack_queue
    int b_flag = 0;
    struct bmtp_package *msg_node;
    list_for_each_entry(msg_node, struct bmtp_package, &bmtp->ack_queue.head, head) {
        //bmtp_log_add(bmtp, "%u %lld %d %d\n", msg_node->op_type, msg_node->stream_id, stream_id, status);
        if (msg_node->op_type == OP_PUB && msg_node->stream_id == stream_id ) {
            b_flag = 1;
            switch( status ) {
                case RET_PUB_OK:
                    //bmtp_log_add(bmtp,"pub %s:%d - %lld - %u %u ok\n",
                    //    inet_ntoa(bmtp->server_addr.sin_addr),
                    //    ntohs(bmtp->server_addr.sin_port),
                    //    stream_id,
                    //    bmtp->op_value.s[1], bmtp->op_value.s[2]);

                    bmtp->on_ack( bmtp, msg_node, status );
                    break;
                default:
                    bmtp->on_ack( bmtp, msg_node, status );
                    break;
            }

            list_del(&msg_node->head);
            bmtp_package_delete(msg_node);
            break;
        }
    }
    if( !b_flag ) {
        //bmtp_log_add(bmtp,"pub %s:%d - %lld not found\n",
        //    inet_ntoa(bmtp->server_addr.sin_addr),
        //    ntohs(bmtp->server_addr.sin_port),
        //    stream_id);
        //BMTP_ERROR( bmtp, BMTP_SID_UNMATCH );
        //return BMTP_UNKNOWN_ERROR;
    }
    
    /*int i = 0;
    list_for_each_entry(msg_node, struct bmtp_msg, &bmtp->ack_queue.head, head) {
        i ++;
    }
    printf("%d left\n", i);
    if(i==1){
        list_for_each_entry(msg_node, struct bmtp_msg, &bmtp->ack_queue.head, head) {
            printf("%lld m left\n", msg_node->stream_id);
        }
    }*/

    return BMTP_OK;
}

int bmtp_on_suback(struct bmtp_context *bmtp) {
    struct bmtp_param_list param_table[16];
    memset(param_table, 0, sizeof(param_table));
    
    if( bmtp_param_parser( bmtp, param_table ) != BMTP_OK ) {
        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
        return BMTP_UNKNOWN_ERROR;
    }
    
    unsigned long long int channel_id = 0;
    unsigned int status = 0;

    if( param_table[PARAM_SUBACK_CHANNEL_ID].key == PARAM_SUBACK_CHANNEL_ID &&
        ( param_table[PARAM_SUBACK_CHANNEL_ID].key_type == TYPE_VARINT || param_table[PARAM_SUBACK_CHANNEL_ID].key_type == TYPE_64BIT ) ) {
        channel_id = param_table[PARAM_SUBACK_CHANNEL_ID].value.l;
    } else {
        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
        return BMTP_UNKNOWN_ERROR;
    }

    if( param_table[PARAM_SUBACK_STATUS].key == PARAM_SUBACK_STATUS &&
        ( param_table[PARAM_SUBACK_STATUS].key_type == TYPE_VARINT || param_table[PARAM_SUBACK_STATUS].key_type == TYPE_64BIT ) ) {
        status = param_table[PARAM_SUBACK_STATUS].value.l;
    } else {
        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
        return BMTP_UNKNOWN_ERROR;
    }
    
    switch( status ) {
        case RET_SUB_OK:
            bmtp_log_add(bmtp,"sub %lld@%s:%d ok\n",
                channel_id,
                inet_ntoa(bmtp->server_addr.sin_addr),
                ntohs(bmtp->server_addr.sin_port));

            //bmtp->on_ack( bmtp, msg_node, status );
            break;
        default:
            bmtp_log_add(bmtp,"sub %lld@%s:%d failed\n",
                channel_id,
                inet_ntoa(bmtp->server_addr.sin_addr),
                ntohs(bmtp->server_addr.sin_port));

            //bmtp->on_ack( bmtp, msg_node, status );
            break;
    }

    // 搜索 ack_queue

    /*TODO 流标识符不匹配*/

    return BMTP_OK;
}

int bmtp_on_pingack(struct bmtp_context *bmtp) {
    // nothing to do
    return BMTP_OK;
}

void bmtp_on_recv(struct bmtp_context *bmtp) {
    unsigned char c;
    while (1) {
        switch (bmtp->state) {
            case STATE_START:
                bmtp->state = STATE_KEY;
                break;
            case STATE_KEY:
                if( bmtp->recv_offset + 1 > bmtp->recv_length ) {
                    return;
                }

                c = bmtp->recv_buf[bmtp->recv_offset ++];

                bmtp->op_code = ( c >> 3 ) & 0xF;
                bmtp->op_type = c;

                if( c & 0x80 ) {
                    bmtp->state = STATE_KEY_EXT1;
                } else {
                    bmtp->state = STATE_VALUE;
                }
                break;
            case STATE_KEY_EXT1:
            case STATE_KEY_EXT2:
            case STATE_KEY_EXT3:
            case STATE_KEY_EXT4:
                if( bmtp->recv_offset + 1 > bmtp->recv_length ) {
                    return;
                }

                c = bmtp->recv_buf[bmtp->recv_offset ++];

                bmtp->op_code += ( c & 0x7F ) << ( 4 + ( bmtp->state - STATE_KEY_EXT1 ) * 7 );
                
                if( c & 0x80 ) {
                    if( bmtp->state == STATE_KEY_EXT4 ) {
                        // key 最长不能超过 32 位
                        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
                        return;
                    } else {
                        bmtp->state += 1;
                    }
                } else {
                    bmtp->state = STATE_VALUE;
                }
                break;
            case STATE_VALUE:
                switch( bmtp->op_type ) {
                    case TYPE_BOOL:   bmtp->state = STATE_VALUE_BOOL;   break;
                    case TYPE_VARINT: bmtp->state = STATE_VALUE_VARINT; break;
                    case TYPE_64BIT:  bmtp->state = STATE_VALUE_64BIT;  break;
                    case TYPE_STRING: bmtp->state = STATE_VALUE_VARINT; break;
                    default: BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR ); return;
                }
                break;
            case STATE_VALUE_BOOL:
                bmtp->op_value.l = 1;
                
                bmtp->state = STATE_PAYLOAD_LENGTH;
                break;
            case STATE_VALUE_64BIT:
                if( bmtp->recv_offset + 8 > bmtp->recv_length ) {
                    return;
                }
                
                bmtp->op_value.l = (unsigned long long int)( bmtp->recv_buf + bmtp->recv_offset );
                
                bmtp->recv_offset += 8;

                bmtp->state = STATE_PAYLOAD_LENGTH;
                break;
            case STATE_VALUE_VARINT:
                if( bmtp->recv_offset + 1 > bmtp->recv_length ) {
                    return;
                }
                
                c = bmtp->recv_buf[bmtp->recv_offset ++];
                
                bmtp->op_value.l = ( c & 0x7F );
                
                if( c & 0x80 ) {
                    bmtp->state = STATE_VALUE_VARINT_EXT1;
                } else {
                    if( bmtp->op_type == TYPE_STRING ) {
                        bmtp->state = STATE_VALUE_STRING;
                    } else {
                        bmtp->state = STATE_PAYLOAD_LENGTH;
                    }
                }
                break;
            case STATE_VALUE_VARINT_EXT1:
            case STATE_VALUE_VARINT_EXT2:
            case STATE_VALUE_VARINT_EXT3:
            case STATE_VALUE_VARINT_EXT4:
            case STATE_VALUE_VARINT_EXT5:
            case STATE_VALUE_VARINT_EXT6:
            case STATE_VALUE_VARINT_EXT7:
            case STATE_VALUE_VARINT_EXT8:
            case STATE_VALUE_VARINT_EXT9:
                if( bmtp->recv_offset + 1 > bmtp->recv_length ) {
                    return;
                }
                
                c = bmtp->recv_buf[bmtp->recv_offset ++];
                
                bmtp->op_value.l += ( c & 0x7F ) << ( ( bmtp->state - STATE_VALUE_VARINT ) * 7 );
                
                if( c & 0x80 ) {
                    if( bmtp->state == STATE_VALUE_VARINT_EXT9 ) {
                        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
                        return;
                    } else {
                        bmtp->state += 1;
                    }
                } else {
                    if( bmtp->op_type == TYPE_STRING ) {
                        bmtp->state = STATE_VALUE_STRING;
                    } else {
                        bmtp->state = STATE_PAYLOAD_LENGTH;
                    }
                }
                break;
            case STATE_VALUE_STRING:
                if( bmtp->recv_offset + bmtp->op_value.l > bmtp->recv_length ) {
                    return;
                }
                
                bmtp->op_value.s = (unsigned char *)(bmtp->recv_buf + bmtp->recv_offset );
                
                bmtp->recv_offset += bmtp->op_value.l;

                bmtp->state = STATE_PAYLOAD_LENGTH;
                break;
            case STATE_PAYLOAD_LENGTH:
                if( bmtp->op_code != OP_PUB ) {
                    bmtp->state = STATE_END;
                    break;
                }

                if( bmtp->recv_offset + 1 > bmtp->recv_length ) {
                    return;
                }
                
                c = bmtp->recv_buf[bmtp->recv_offset ++];
                
                bmtp->payload_length = ( c & 0x7F );                

                if( c & 0x80 ) {
                    bmtp->state = STATE_PAYLOAD_LENGTH_EXT1;
                } else {
                    bmtp->state = STATE_PAYLOAD;
                }
                break;
            case STATE_PAYLOAD_LENGTH_EXT1:
            case STATE_PAYLOAD_LENGTH_EXT2:
            case STATE_PAYLOAD_LENGTH_EXT3:
            case STATE_PAYLOAD_LENGTH_EXT4:
                if( bmtp->recv_offset + 1 > bmtp->recv_length ) {
                    return;
                }
                
                c = bmtp->recv_buf[bmtp->recv_offset ++];
                
                bmtp->payload_length += ( c & 0x7F ) << ( ( bmtp->state - STATE_PAYLOAD_LENGTH ) * 7 );                

                if( c & 0x80 ) {
                    if( bmtp->state == STATE_PAYLOAD_LENGTH_EXT4 ) {
                        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
                        return;
                    } else {
                        bmtp->state += 1;
                    }
                } else {
                    bmtp->state = STATE_PAYLOAD;
                }
                break;
            case STATE_PAYLOAD:
                if( bmtp->recv_offset + bmtp->payload_length > bmtp->recv_length ) {
                    return;
                }
                
                bmtp->payload = (unsigned char *)(bmtp->recv_buf + bmtp->recv_offset );
                
                bmtp->recv_offset += bmtp->payload_length;

                bmtp->state = STATE_END;
                break;
            case STATE_END:
                //printf("recv_offset: %u\n", bmtp->recv_offset);
                switch( bmtp->op_code ) {
                    case OP_CONNACK:
                        if( bmtp_on_connack(bmtp) != BMTP_OK ) {
                            return;
                        }
                        break;
                    case OP_PUB:
                        if( bmtp_on_publish(bmtp) != BMTP_OK ) {
                            return;
                        }
                        break;
                    case OP_PUBACK:
                        if( bmtp_on_puback(bmtp) != BMTP_OK ) {
                            return;
                        }
                        break;
                    case OP_SUBACK:
                        if( bmtp_on_suback(bmtp) != BMTP_OK ) {
                            return;
                        }
                        break;
                    case OP_PINGACK:
                        break;
                    default:
                        // 指令不存在
                        BMTP_ERROR( bmtp, BMTP_UNKNOWN_ERROR );
                        return;
                }

                if (bmtp->recv_offset >= 4096) {
                    memmove(bmtp->recv_buf, bmtp->recv_buf + bmtp->recv_offset, bmtp->recv_length - bmtp->recv_offset);
                    bmtp->recv_buf = (unsigned char *)realloc(bmtp->recv_buf, bmtp->recv_length - bmtp->recv_offset);
                    bmtp->recv_length -= bmtp->recv_offset;
                    bmtp->recv_offset = 0;
                }
                
                bmtp->state = STATE_START;
                break;
            default:
                BMTP_ERROR( bmtp, BMTP_UNRECOGNIZED_STATE );
                return;
        }
    }
}

ssize_t bmtp_recv(struct bmtp_context *bmtp, void *buf, size_t len) {
    int ret;
    bmtp_err_t err;

    bmtp_set_errno(0);

    if(bmtp->ssl) {
        ret = SSL_read(bmtp->ssl, buf, len);
        if(ret <= 0) {
            err = SSL_get_error(bmtp->ssl, ret);
            if(err == SSL_ERROR_WANT_READ) {
                ret = -1;
                bmtp_set_errno(BMTP_EAGAIN);
            } else if(err == SSL_ERROR_WANT_WRITE) {
                ret = -1;
                //ev->want_write = true;
                bmtp_set_errno(BMTP_EAGAIN);
            } else {
                bmtp_set_errno(BMTP_ENOPROTOOPT);
            }
        }
        return (ssize_t )ret;
    }else{
        /* Call normal read/recv */
        return recv(bmtp->fd, buf, len, 0);
    }
}

ssize_t bmtp_send(struct bmtp_context *bmtp, void *buf, size_t len) {
    int ret;
    bmtp_err_t err;

    bmtp_set_errno(0);

    if(bmtp->ssl){
        //ev->want_write = false;
        ret = SSL_write(bmtp->ssl, buf, len);
        if(ret < 0){
            err = SSL_get_error(bmtp->ssl, ret);
            if(err == SSL_ERROR_WANT_READ){
                ret = -1;
                bmtp_set_errno(BMTP_EAGAIN);
            }else if(err == SSL_ERROR_WANT_WRITE){
                ret = -1;
                //mosq->want_write = true;
                bmtp_set_errno(BMTP_EAGAIN);
            }else{
                bmtp_set_errno(BMTP_ENOPROTOOPT);
            }
        }
        return (ssize_t )ret;
    }else{
        /* Call normal write/send */
        return send(bmtp->fd, buf, len, 0);
    }
}

void bmtp_write_cb(evutil_socket_t sock, short flags, void * args) {
    struct bmtp_context *bmtp = (struct bmtp_context *)args;

    switch (flags) {
    case EV_TIMEOUT:
        break;
    case EV_WRITE:
        while (!list_empty(&bmtp->send_queue.head)) {
            struct bmtp_package *msg_node = list_first_entry(&bmtp->send_queue.head, struct bmtp_package, head);
            
            /*if( msg_node->stream_id ) {
            bmtp_log_add(bmtp,"pubing %s:%d - %lld\n",
                inet_ntoa(bmtp->server_addr.sin_addr),
                ntohs(bmtp->server_addr.sin_port),
                msg_node->stream_id);
            
                int i;
                for(i = msg_node->send_offset; i < msg_node->send_buf_len; i++) {
                   bmtp_log_add(bmtp,"%x ", msg_node->send_buf[i]);
                }
                bmtp_log_add(bmtp,"\n");
            }*/
            
            int ret = bmtp_send(bmtp, (char *)msg_node->send_buf + msg_node->send_offset, msg_node->send_buf_len - msg_node->send_offset);
            if (ret == 0) {
                /*connection closed*/
                msg_node->send_offset = 0;
                break;
            }
            else if (ret < 0) {
                if (bmtp_socket_errno == BMTP_EAGAIN) {
                    struct timeval tv = { 15, 0 };
                    event_add(bmtp->event_write, &tv);
                } else {
                    msg_node->send_offset = 0;
                }
                break;
            }
            else {
                if (ret + msg_node->send_offset >= msg_node->send_buf_len) {
                    bmtp_mutex_lock( bmtp );
                    list_del(&msg_node->head);
                    bmtp_mutex_unlock( bmtp );
                    if ((msg_node->op_code == OP_PUB && msg_node->stream_id > 0) ||
                        (msg_node->op_code == OP_SUB)) {
                        //static int i = 0;
                        //i ++;
                        //printf("%d\n", i);
                        list_add_tail(&msg_node->head, &bmtp->ack_queue.head);
                    } else {
                        //printf("%u %lld deleted\n",msg_node->op_code, msg_node->stream_id);
                        bmtp_package_delete(msg_node);
                    }
                }
                else {
                    msg_node->send_offset += ret;
                }
            }
        }
        break;
    }
}

void bmtp_read_cb(evutil_socket_t sock, short flags, void * args) {
    struct bmtp_context *bmtp = (struct bmtp_context *)args;
    char recv_buf[10240];
    struct timeval tv = { 30, 0 };
    int ret;

    switch (flags) {
    case EV_TIMEOUT:
        bmtp_ping(bmtp);
        event_add(bmtp->event_read, &tv);
        break;
    case EV_READ:
        while (1) {
            ret = bmtp_recv(bmtp, recv_buf, 10240);
            if (ret == 0) {
                /*connection closed*/
                break;
            }
            else if (ret < 0) {
                if (bmtp_socket_errno == BMTP_EAGAIN) {
                    event_add(bmtp->event_read, &tv);
                }
                break;
            }
            else {
                void *p = realloc(bmtp->recv_buf, bmtp->recv_length + ret);
                if (p == NULL) {
                    BMTP_ERROR( bmtp, BMTP_OUT_OF_MEMORY );
                    break;
                }
                else {
                    bmtp->recv_buf = (unsigned char *)p;
                    memcpy(bmtp->recv_buf + bmtp->recv_length, recv_buf, ret);
                    bmtp->recv_length += ret;

                    bmtp_on_recv(bmtp);
                }
            }
        }
        break;
    }
}

#ifdef WIN32
unsigned int __stdcall bmtp_main_thread(LPVOID lpParam) {
#else
void* bmtp_main_thread( void* lpParam ) {
#endif
    struct bmtp_context *bmtp = (struct bmtp_context*)lpParam;

    event_base_dispatch(bmtp->base);

    bmtp_close(bmtp);

#ifdef _DEBUG
    _CrtDumpMemoryLeaks();
#endif

    return 0;
}

static SSL_CTX *ssl_ctx;

void bmtp_close(struct bmtp_context *bmtp) {
    if( bmtp->fd > 0 ) {
        if( bmtp->ssl ) {
            SSL_shutdown(bmtp->ssl); 
            SSL_free(bmtp->ssl);
            bmtp->ssl = NULL;
        }
        evutil_closesocket( bmtp->fd );
        bmtp->fd = -1;
    }

    if (bmtp->recv_buf) {
        free(bmtp->recv_buf);
        bmtp->recv_buf = NULL;
        bmtp->recv_length = 0;
        bmtp->recv_offset = 0;
    }

    if (bmtp->event_read) {
        event_del(bmtp->event_read);
        bmtp->event_read = NULL;
    }

    if (bmtp->event_write) {
        event_del(bmtp->event_write);
        bmtp->event_write = NULL;
    }

    if (bmtp->base) {
        event_base_free(bmtp->base);
        bmtp->base = NULL;
    }
    
    // Bugfix: 由于没有重置状态机，导致断线重连后可能无法正常处理数据
    bmtp->state = STATE_START;
    
    bmtp_mutex_lock( bmtp );
    
    // Bugfix: 有时候会出现消息发送到一半被中止的情况，必须修正 offset
    struct bmtp_package *msg_node;
    list_for_each_entry(msg_node, struct bmtp_package, &bmtp->send_queue.head, head) {
        msg_node->send_offset = 0;
    }

    // 把 ack_queue 中的消息全部放到 send_queue 末尾
    while (!list_empty(&bmtp->ack_queue.head)) {
        struct bmtp_package *msg_node = list_first_entry(&bmtp->ack_queue.head, struct bmtp_package, head);
        // Bugfix: 重新发送的消息必须重新设置 offset
        msg_node->send_offset = 0;
        list_del(&msg_node->head);
        list_add_tail(&msg_node->head, &bmtp->send_queue.head);
        // TODO 设置重发标志
    }
    
    // TODO 连接断开后，所有未发送的确认消息将失效，应当被移除
    
    bmtp_mutex_unlock( bmtp );
    
    bmtp_log_add(bmtp,"%s:%d closed.\n",
        inet_ntoa(bmtp->server_addr.sin_addr),
        ntohs(bmtp->server_addr.sin_port));
    
    bmtp->on_close( bmtp );
}

BMTP_API int bmtp_init() {
#ifdef WIN32
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != NO_ERROR) {
        return 1;
    }
#endif

#ifdef WIN32
    evthread_use_windows_threads();
#else  
    evthread_use_pthreads();
#endif

    // Initialize OpenSSL
    SSL_load_error_strings();
    SSL_library_init();

    ssl_ctx = SSL_CTX_new(SSLv23_method());
    if(!ssl_ctx) {
        return 1;
    }

    return BMTP_OK;
}

BMTP_API void bmtp_cleanup() {
#ifdef WIN32
    WSACleanup();
#endif
}

BMTP_API BMTP* bmtp_new(const char *ip, int port, int secure) {
    struct bmtp_context *bmtp = (struct bmtp_context*)calloc(1, sizeof(struct bmtp_context));
    if( bmtp == NULL ) {
        return NULL;
    }
    
    // TODO 增加域名解析

    bmtp->server_addr.sin_family = AF_INET;
    bmtp->server_addr.sin_port = htons(port);
#ifdef WIN32
    bmtp->server_addr.sin_addr.S_un.S_addr = inet_addr(ip);
#else
    bmtp->server_addr.sin_addr.s_addr = inet_addr(ip);
#endif
    memset(bmtp->server_addr.sin_zero, 0x00, 8);

    if( bmtp_log_init(bmtp) != 0 ) {
        return NULL;
    }

    bmtp->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (connect(bmtp->fd, (struct sockaddr*)&bmtp->server_addr, sizeof(bmtp->server_addr)) == -1) {
        bmtp_log_add(bmtp,"connect to %s:%d, failed.\n", ip, port);
        return NULL;
    }
    
    bmtp->secure = secure;

    if( bmtp->secure ) {
        bmtp->ssl = SSL_new(ssl_ctx);
        if(!bmtp->ssl) {
            evutil_closesocket(bmtp->fd);
            bmtp_log_add(bmtp,"SSL_new, failed.\n");
            return NULL;
        }
        
        SSL_set_fd(bmtp->ssl, bmtp->fd);
        
        if( SSL_connect(bmtp->ssl) != 1 ) {
            evutil_closesocket(bmtp->fd);
            bmtp_log_add(bmtp,"SSL_connect, failed.\n");
            return NULL;
        }
    }

    evutil_make_socket_nonblocking( bmtp->fd );
    
    bmtp->base = event_base_new();
    bmtp->event_write = event_new(bmtp->base, bmtp->fd, EV_TIMEOUT | EV_WRITE, bmtp_write_cb, (void*)bmtp);
    bmtp->event_read = event_new(bmtp->base, bmtp->fd, EV_TIMEOUT | EV_READ, bmtp_read_cb, (void*)bmtp);
    
    bmtp->on_close   = bmtp_on_close;
    bmtp->on_error   = bmtp_on_error;
    bmtp->on_open    = bmtp_on_open;
    bmtp->on_pub     = bmtp_on_pub;
    bmtp->on_ack     = bmtp_on_ack;

    list_init(&bmtp->send_queue.head);
    list_init(&bmtp->ack_queue.head);

    bmtp_mutex_init( bmtp );

    bmtp_log_add(bmtp,"connect to %s:%d, ok.\n", ip, port);
    return bmtp;
}

BMTP_API int bmtp_reconnect( BMTP *p ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;
#ifdef WIN32
    WSADATA wsaData;
    while (WSAStartup(MAKEWORD(2, 2), &wsaData) != NO_ERROR) {
        Sleep( 1000 );
    }
#endif
    bmtp->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if( connect(bmtp->fd, (struct sockaddr*)&bmtp->server_addr, sizeof(bmtp->server_addr)) == -1 ) {
        bmtp_log_add(bmtp,"reconnect to %s:%d, failed.\n",
            inet_ntoa(bmtp->server_addr.sin_addr),
            ntohs(bmtp->server_addr.sin_port));
        return 1;
    }

    if( bmtp->secure ) {
        bmtp->ssl = SSL_new(ssl_ctx);
        if(!bmtp->ssl) {
            evutil_closesocket(bmtp->fd);
            bmtp_log_add(bmtp,"SSL_new, failed.\n");
            return 2;
        }
        
        SSL_set_fd(bmtp->ssl, bmtp->fd);
        
        if( SSL_connect(bmtp->ssl) != 1 ) {
            evutil_closesocket(bmtp->fd);
            bmtp_log_add(bmtp,"SSL_connect, failed.\n");
            return 3;
        }
    }

    evutil_make_socket_nonblocking( bmtp->fd );

    bmtp->base = event_base_new();
    bmtp->event_write = event_new(bmtp->base, bmtp->fd, EV_WRITE, bmtp_write_cb, (void*)bmtp);
    bmtp->event_read = event_new(bmtp->base, bmtp->fd, EV_READ, bmtp_read_cb, (void*)bmtp);

    //list_init(&bmtp->wait_queue.head);
    //list_init(&bmtp->send_queue.head);
    //list_init(&bmtp->ack_queue.head);

    //bmtp_open(bmtp);
    
    bmtp_log_add(bmtp,"reconnect to %s:%d, ok.\n",
        inet_ntoa(bmtp->server_addr.sin_addr),
        ntohs(bmtp->server_addr.sin_port));
    
    return 0;
}

BMTP_API int bmtp_set_on_open( BMTP *p, void( *on_open )( BMTP * ) ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;

    bmtp->on_open = on_open;

    return BMTP_OK;
}

BMTP_API int bmtp_set_on_close( BMTP *p, void( *on_close )( BMTP * ) ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;

    bmtp->on_close = on_close;

    return BMTP_OK;
}

BMTP_API int bmtp_set_on_error( BMTP *p, void( *on_error )( BMTP *, int err_no ) ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;

    bmtp->on_error = on_error;

    return BMTP_OK;
}

BMTP_API int bmtp_set_on_pub( BMTP *p, void( *on_pub )( BMTP *, BMTP_MSG * ) ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;

    bmtp->on_pub = on_pub;

    return BMTP_OK;
}

BMTP_API int bmtp_set_on_ack( BMTP *p, void( *on_ack )( BMTP *, BMTP_PACKAGE *, int ) ) {
    struct bmtp_context *bmtp = ( struct bmtp_context* )p;

    bmtp->on_ack = on_ack;

    return BMTP_OK;
}

BMTP_API BMTP_PACKAGE* bmtp_package_new() {
    struct bmtp_package *msg = (struct bmtp_package *)calloc(1, sizeof(struct bmtp_package));

    if( msg ) {
        list_init(&msg->param.head);
    }
    
    return msg;
}

BMTP_API void bmtp_package_delete( BMTP_PACKAGE *m ) {
    struct bmtp_package *msg = (struct bmtp_package*)m;
    
    struct bmtp_param_list *param;
    while(!list_empty(&msg->param.head)) {
        param = list_first_entry(&msg->param.head, struct bmtp_param_list, head);
        list_del(&param->head);
        free(param);
    }
    
    if( msg->send_buf ) {
        free(msg->send_buf);
    }
    
    free(msg);
}

BMTP_API int bmtp_package_add_param( BMTP_PACKAGE *m, int key, int type, unsigned long long int l, const char *s ) {
    struct bmtp_package *msg = (struct bmtp_package*)m;
    
    if( !msg->op_code ) {
        return BMTP_OK;
    }
    
    struct bmtp_param_list *param = (struct bmtp_param_list*)calloc(1, sizeof(struct bmtp_param_list));
    if( !param ) {
        return BMTP_OUT_OF_MEMORY;
    }
    
    param->key = key;
    param->key_type = type;

    msg->param_len ++;
    key >>= 4;

    while(key) {
        msg->param_len ++;
        key >>= 7;
    }
    
    switch( param->key_type ) {
        case TYPE_STRING:
            param->value.s = (unsigned char *)s;
            param->value.l = l;
            
            do {
                msg->param_len ++;
            } while(l >>= 7);
            msg->param_len += param->value.l;
            break;
        case TYPE_BOOL:
            break;
        case TYPE_VARINT:
            param->value.l = l;
            
            do {
                msg->param_len ++;
            } while(l >>= 7);
            break;
        case TYPE_64BIT:
            param->value.l = l;
            
            msg->param_len += 8;
            break;
        default:
            break;
    }
    
    list_add_tail(&param->head, &msg->param.head);
    
    if( msg->op_code == OP_PUB && param->key == PARAM_PUB_STREAM_ID ) {
        msg->stream_id = param->value.l;
    }
    
    return BMTP_OK;
}

BMTP_API int bmtp_package_set_opcode( BMTP_PACKAGE *m, int op_code, int op_type, unsigned long long int l) {
    struct bmtp_package *msg = (struct bmtp_package*)m;
    
    msg->op_code = op_code;
    msg->op_type = op_type;
    msg->op_value.l = l;
    
    return BMTP_OK;
}

BMTP_API int bmtp_package_set_payload( BMTP_PACKAGE *m, unsigned int l, const char *s ) {
    struct bmtp_package *msg = (struct bmtp_package*)m;
    
    msg->payload = (unsigned char *)s;
    msg->payload_length = l;
    
    return BMTP_OK;
}

BMTP_API int bmtp_package_send( BMTP *p, BMTP_PACKAGE *m ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;
    struct bmtp_package *msg = (struct bmtp_package*)m;
    
    if( msg->send_buf == NULL ) {
        msg->send_buf = malloc(msg->param_len + msg->payload_length + 30);
        
        int len = 0;
        int op_code = msg->op_code;

        msg->send_buf[len++] = ( ( ( op_code & 0xF ) | 0x10 ) << 3 ) + msg->op_type;
        op_code >>= 4;

        while(op_code) {
            msg->send_buf[len++] = ( op_code & 0x7F ) | 0x80;
            op_code >>= 7;
        };

        msg->send_buf[len-1] &= 0x7F;

        if( msg->op_code == OP_CONN ) {
            msg->send_buf[len++] = 'B';
            msg->send_buf[len++] = 'M';
            msg->send_buf[len++] = 'T';
            msg->send_buf[len++] = 'P';
        }

        int l = msg->op_value.l;
        
        switch( msg->op_type ) {
            case TYPE_BOOL:
                break;
            case TYPE_VARINT:
                do {
                    msg->send_buf[len++] = ( l & 0x7F ) | 0x80;
                } while( l >>= 7 );
                msg->send_buf[len-1] &= 0x7F;
                break;
            case TYPE_64BIT:
                *((unsigned long long int *)(msg->send_buf + len)) = l;
                len += 8;
                break;
            case TYPE_STRING:
                l = msg->param_len;
                do {
                    msg->send_buf[len++] = ( l & 0x7F ) | 0x80;
                } while( l >>= 7 );
                msg->send_buf[len-1] &= 0x7F;
                break;
            default:
                break;
        }
        
        struct bmtp_param_list *param;
        list_for_each_entry(param, struct bmtp_param_list, &msg->param.head, head) {
            int key = param->key;
            
            msg->send_buf[len++] = ( ( ( key & 0xF ) | 0x10 ) << 3 ) + param->key_type;
            key >>= 4;

            while(key) {
                msg->send_buf[len++] = ( key & 0x7F ) | 0x80;
                key >>= 7;
            };

            msg->send_buf[len-1] &= 0x7F;

            int l = param->value.l;

            switch( param->key_type ) {
                case TYPE_BOOL:
                    break;
                case TYPE_VARINT:
                    do {
                         msg->send_buf[len++] = ( l & 0x7F ) | 0x80;
                    } while( l >>= 7 );
                    msg->send_buf[len-1] &= 0x7F;
                    break;
                case TYPE_64BIT:
                    *((unsigned long long int *)(msg->send_buf + len)) = l;
                    len += 8;
                    break;
                case TYPE_STRING:
                    do {
                         msg->send_buf[len++] = ( l & 0x7F ) | 0x80;
                    } while( l >>= 7 );
                    msg->send_buf[len-1] &= 0x7F;

                    memcpy(msg->send_buf + len, param->value.s, param->value.l);
                    len += param->value.l;
                    break;
                default:
                    break;
            }
        }
        
        if( msg->payload ) {
            int l = msg->payload_length;
            do {
                 msg->send_buf[len++] = ( l & 0x7F ) | 0x80;
            } while( l >>= 7 );
            msg->send_buf[len-1] &= 0x7F;
            
            memcpy(msg->send_buf + len, msg->payload, msg->payload_length);
            len += msg->payload_length;
        }
        
        msg->send_buf_len = len;
        msg->send_offset = 0;
    }
    
    
    bmtp_mutex_lock( bmtp );
    
    if( msg->op_code == OP_CONN ) {
        list_add(&msg->head, &bmtp->send_queue.head);
    } else {
        list_add_tail(&msg->head, &bmtp->send_queue.head);
    }

    if( bmtp->event_write ) {
        struct timeval tv = { 15, 0 };
        event_add( bmtp->event_write, &tv );
    }

    bmtp_mutex_unlock( bmtp );

    return BMTP_OK;
}

BMTP_API int bmtp_conn( BMTP *p, const char *auth, int len ) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;

    BMTP_PACKAGE *msg = bmtp_package_new();
    if(!msg) {
        return BMTP_OUT_OF_MEMORY;
    }
    
    bmtp_package_set_opcode(msg, OP_CONN, TYPE_STRING, 0);
    if( auth && len ) {
        bmtp_package_add_param(msg, PARAM_CONN_AUTH, TYPE_STRING, len, auth);
    }
    
    bmtp_package_send(bmtp, msg);

    struct timeval tv = { 15, 0 };
    event_add(bmtp->event_write, &tv);
    tv.tv_sec = 30;
    event_add(bmtp->event_read, &tv);

//    /*
#ifdef WIN32
    _beginthreadex(NULL, 0, bmtp_main_thread, (LPVOID)bmtp, 0, NULL);
#else
    pthread_t tid;
    pthread_create(&tid, 0, bmtp_main_thread, bmtp);
#endif
//     */
//    bmtp_main_thread(bmtp);

    return BMTP_OK;    
}

BMTP_API int bmtp_pub(BMTP *p, BMTP_MSG *msg) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;
    
    BMTP_PACKAGE *pkg = bmtp_package_new();
    if(!pkg) {
        return BMTP_OUT_OF_MEMORY;
    }
    
    bmtp_package_set_opcode(pkg, OP_PUB, TYPE_STRING, 0);

    bmtp_package_add_param(pkg, PARAM_PUB_CONSUMER_ID, TYPE_VARINT, msg->consumer_id, NULL);
    bmtp_package_add_param(pkg, PARAM_PUB_TYPE, TYPE_VARINT, msg->type, NULL);
    bmtp_package_add_param(pkg, PARAM_PUB_EXPIRE, TYPE_VARINT, msg->expire, NULL);
    if( msg->stream_id ) {
        bmtp_package_add_param(pkg, PARAM_PUB_STREAM_ID, TYPE_VARINT, msg->stream_id, NULL);
    }
    
    bmtp_package_set_payload(pkg, msg->data_len, msg->data);
    
    bmtp_package_send(bmtp, pkg);

    return 0;
}

BMTP_API int bmtp_sub(BMTP*p, unsigned long long int channel_id) {
    struct bmtp_context *bmtp = (struct bmtp_context*)p;
    
    bmtp_log_add(bmtp,"try to sub %lld@%s:%d.\n",
        channel_id,
        inet_ntoa(bmtp->server_addr.sin_addr),
        ntohs(bmtp->server_addr.sin_port));

    BMTP_PACKAGE *msg = bmtp_package_new();
    if(!msg) {
        return BMTP_OUT_OF_MEMORY;
    }
    
    bmtp_package_set_opcode(msg, OP_SUB, TYPE_VARINT, channel_id);

    bmtp_package_send(bmtp, msg);
    
    return 0;
}

BMTP_API int bmtp_ping( BMTP *p ) {
    struct bmtp_context *bmtp = ( struct bmtp_context* )p;
    
    BMTP_PACKAGE *msg = bmtp_package_new();
    if(!msg) {
        return BMTP_OUT_OF_MEMORY;
    }
    
    bmtp_package_set_opcode(msg, OP_PING, TYPE_BOOL, 0);
    
    bmtp_package_send(bmtp, msg);
    
    return BMTP_OK;
}

BMTP_API int bmtp_debug( BMTP*p ) {
    struct bmtp_context *bmtp = ( struct bmtp_context* )p;

    printf( "%d %d\n", bmtp->recv_length, bmtp->recv_offset );

    unsigned int i = 0;
    while( i < bmtp->recv_length ) {
        printf( "%x ", bmtp->recv_buf[i] );
        i++;
    }
    return BMTP_OK;
}